Multiplex filtering#362
Conversation
|
Refactored the code so that a separate trigger function was no longer needed. The multiplex recipe has been updated so that a new output channel "filtering" is able to trigger xia2.multiplex_filtering. This saves moving parameters between multiplex and multiplex_filtering. |
|
|
||
| # Place holder code for future iterations where may run filtering jobs on clusters | ||
|
|
||
| if cluster_num is not None: |
There was a problem hiding this comment.
I don't think it makes sense to retain cluster logic here (and possibly elsewhere in this wrapper). The flitering job is triggered as a separate job by the multiplex wrapper. With the way the recipe is structured, if you were running filtering on clusters, a separate call of the filtering wrapper would get made for each cluster so you wouldn't need the same logic that loops over and distinguishes between clusters and non-clusters.
There was a problem hiding this comment.
That makes sense - I have removed the cluster logic from ispyb submission and when searching for the output file attachments.
| # ignore filtering parameters for xia2.multiplex_filtering | ||
| ignore = { | ||
| "sample_id", | ||
| "sample_group_id", | ||
| "filtering.method", | ||
| "deltacchalf.stdcutoff", | ||
| "deltacchalf.mode", | ||
| "deltacchalf.group_size", | ||
| } |
There was a problem hiding this comment.
The comment here is misleadingly placed as it implies that sample_id and sample_group_id are flitering parameters.
It is also a bit clunky having to explicitly list parameters to ignore, though I appreciate this is due to how the multiplex job parameters are passed via the trigger service and might be hard to avoid.
| ignore = { | ||
| "sample_id", | ||
| "sample_group_id", | ||
| "data", | ||
| "clustering.method", | ||
| "clustering.output_clusters", | ||
| } |
There was a problem hiding this comment.
Again, it will be a pain in the future if you have to add a parameter here every time you want to pass a new parameter to multiplex and not filtering via the trigger service.
| elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try: | ||
| self.log.warning("Timed out waiting for xia2.multiplex files to copy.") | ||
| timedout = True | ||
| waiting = False |
There was a problem hiding this comment.
| elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try: | |
| self.log.warning("Timed out waiting for xia2.multiplex files to copy.") | |
| timedout = True | |
| waiting = False | |
| elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try: | |
| self.log.error("Timed out waiting for xia2.multiplex files to copy.") | |
| return False |
You can just return False here and remove the need for the timedout variable. Also this should log as an error instead of a warning.
| if not mplx_file.is_file(): | ||
| waiting_processing_files.append(mplx_file) | ||
| self.log.info( | ||
| f"Files still copying - {mplx_file} not yet present in {multiplex_dir}." |
There was a problem hiding this comment.
mplx_file is the complete file path, including multiplex_dir so having both together in this log message is unnecessary. Either just include the file name (i.e. mplx_file.name) along with the directory or give the full file path but not the directory as well.
Logging individually for each file missing is also overkill. You could wait for the loop to finish and log the list of waiting_processing_files instead.
| ntry = 0 | ||
| waiting = True | ||
| timedout = False | ||
| backoff_max_try = 10 | ||
| backoff_multiplier = 2 | ||
| backoff_delay = 8 | ||
| while waiting: | ||
| waiting_processing_files = [] | ||
| for mplx_file in needed_files: | ||
| if not mplx_file.is_file(): | ||
| waiting_processing_files.append(mplx_file) | ||
| self.log.info( | ||
| f"Files still copying - {mplx_file} not yet present in {multiplex_dir}." | ||
| ) | ||
| if len(waiting_processing_files) > 0 and ntry < backoff_max_try: | ||
| delay = int(backoff_delay * backoff_multiplier**ntry) | ||
| time.sleep(delay) | ||
| ntry += 1 | ||
| elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try: | ||
| self.log.warning("Timed out waiting for xia2.multiplex files to copy.") | ||
| timedout = True | ||
| waiting = False | ||
| else: | ||
| self.log.info("All files present for xia2.multiplex_filtering") | ||
| waiting = False | ||
|
|
There was a problem hiding this comment.
It might be better to delegate all of this logic to the FileWatcher service. This way, you wouldn't be tying up resources on the cluster waiting for files to be copied across.
xia2.multiplexhas a filtering option built in which can greatly improve data reduction quality. VMXm, in particular, always manually reprocess datasets with xia2.multiplex to turn on these filtering parameters. Therefore, it would be nice to include this as a part of the auto processing infrastructure.The issue has always been that the filtering can be slow, and this can impede rapid feedback. In xia2, we recently made a new command line program,
xia2.multiplex_filtering. This performs the same filtering algorithms on a completed multiplex job. By breaking the algorithm into two separate programs, this would allow for rapid feedback as well as providing a filtered mtz later. This PR attempts to provide trigger/wrappers for such a filtering pipeline.The cluster number is passed through from
multiplextomultiplex_filteringto ensure that it is not triggered on clusters (possible implementation for clusters in the future, but would need slightly different triggering requirements).As this pipeline relies on a finished multiplex directory (specific files needed that are not user-interesting), checks are done to make sure data is available where expected. This is done using the same delay multipliers as multiplex.
The sample group information is also passed through from multiplex. This is important, as there can be multiple sample groups related to a single DCID. Multiplex also passes through the actual DCID's it used in processing. This is also important, as the stored list of related DCID's can include both rotation/grid scans or other datasets that should not be used. Given all the relevant queries are already done in the multiplex trigger, it seemed easiest to pass these through rather than repeating all these queries.
The filtering itself is set to
image_groupmode, which means all the images are grouped into batches and a deltacchalf algorithm is used to see if any of these batches do not correlate well with the rest of the data. A group size of 50 is set as default, as this corresponds to 5deg rotation (following standard 0.1 deg fine slicing). However, VMXm have had success using a group size of 10, so they have this specified for their beamline.General intent here is to test on VMXm first via staging, then roll it out live just for VMXm initially. This will be useful stress testing prior to deployment on other beam lines. Eventually, it is expected that this is triggered on all beam lines after multiplex.
NOTE: will need dials/latest to run -> this includes
xia2.multiplex_filteringbug fixes which are not in the latest release.